A skewed dataset is defined by a dataset that has a class imbalance, this leads to poor or failing spark jobs that often get a OOM (out of memory) error.

When performing a join onto a skewed dataset it's usually the case where there is an imbalance on the key(s) on which the join is performed on. This results in a majority of the data falls onto a single partition, which will take longer to complete than the other partitions.

Some hints to detect skewness is:

  1. The key(s) consist mainly of null values which fall onto a single partition.
  2. There is a subset of values for the key(s) that makeup the high percentage of the total keys which fall onto a single partition.

We go through both these cases and see how we can combat it.

Library Imports

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

Template

spark = (
    SparkSession.builder
    .master("local")
    .appName("Exploring Joins")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)

sc = spark.sparkContext

Situation 2: High Frequency Keys

Inital Datasets

customers = spark.createDataFrame([
    (1, "John"), 
    (2, "Bob"),
], ["customer_id", "first_name"])

customers.toPandas()
customer_id first_name
0 1 John
1 2 Bob
orders = spark.createDataFrame([
    (i, 1 if i < 95 else 2, "order #{}".format(i)) for i in range(100) 
], ["id", "customer_id", "order_name"])

orders.toPandas().tail(6)
id customer_id order_name
94 94 1 order #94
95 95 2 order #95
96 96 2 order #96
97 97 2 order #97
98 98 2 order #98
99 99 2 order #99

Option 1: Inner Join

df = customers.join(orders, "customer_id")

df.toPandas().tail(10)
customer_id first_name id order_name
90 1 John 90 order #90
91 1 John 91 order #91
92 1 John 92 order #92
93 1 John 93 order #93
94 1 John 94 order #94
95 2 Bob 95 order #95
96 2 Bob 96 order #96
97 2 Bob 97 order #97
98 2 Bob 98 order #98
99 2 Bob 99 order #99
df.explain()
== Physical Plan ==
*(5) Project [customer_id#122L, first_name#123, id#126L, order_name#128]
+- *(5) SortMergeJoin [customer_id#122L], [customer_id#127L], Inner
   :- *(2) Sort [customer_id#122L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(customer_id#122L, 200)
   :     +- *(1) Filter isnotnull(customer_id#122L)
   :        +- Scan ExistingRDD[customer_id#122L,first_name#123]
   +- *(4) Sort [customer_id#127L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(customer_id#127L, 200)
         +- *(3) Filter isnotnull(customer_id#127L)
            +- Scan ExistingRDD[id#126L,customer_id#127L,order_name#128]

What Happened:

  • We want to find what orders each customer made, so we will be joining the customers table to the orders table.
  • When performing the join, we perform a hashpartitioning on customer_id.
  • From our data creation, this means 95% of the data landed onto a single partition.

Results:

  • Similar to the Null Skew case, this means that single task/partition will take a lot longer than the others, and most likely erroring out.

Option 2: Split the DataFrame in 2 Sections, High Frequency and Non-High Frequency values

results matching ""

    No results matching ""